defmodule Logflare.Backends.Adaptor.HttpBased.Pipeline do
  @moduledoc """
  Common Broadway pipeline for HTTP-based adaptors sending log batches using Tesla client
  """

  use Broadway
  alias Logflare.Backends.Adaptor.HttpBased
  alias Broadway.Message
  alias Logflare.Backends
  alias Logflare.Backends.Backend
  alias Logflare.Backends.BufferProducer
  alias Logflare.Sources
  alias Logflare.Sources.Source
  alias Logflare.Utils

  @spec start_link(Source.t(), Backend.t(), module()) :: Broadway.on_start()
  def start_link(source, backend, client) do
    Broadway.start_link(__MODULE__,
      name: Backends.via_source(source.id, __MODULE__, backend.id),
      hibernate_after: 5_000,
      spawn_opt: [
        fullsweep_after: 100
      ],
      producer: [
        module:
          {BufferProducer,
           [
             backend_id: backend.id,
             source_id: source.id
           ]},
        transformer: {__MODULE__, :transform, []},
        concurrency: 1
      ],
      processors: [
        default: [concurrency: 3, min_demand: 1]
      ],
      batchers: [
        http: [concurrency: 6, batch_size: 250]
      ],
      context: %{
        source_id: source.id,
        backend_id: backend.id,
        source_token: source.token,
        backend_token: backend.token,
        client: client
      }
    )
  end

  # see the implementation for Backends.via_source/2 for how tuples are used to identify child processes
  def process_name({:via, module, {registry, identifier}}, base_name) do
    new_identifier = Utils.append_to_tuple(identifier, base_name)
    {:via, module, {registry, new_identifier}}
  end

  def handle_message(_processor_name, message, _context) do
    message
    |> Message.put_batcher(:http)
  end

  def handle_batch(:http, messages, batch_info, context) do
    :telemetry.execute(
      [:logflare, :backends, :pipeline, :handle_batch],
      %{batch_size: batch_info.size, batch_trigger: batch_info.trigger},
      %{
        backend_type: :http_based
      }
    )

    backend = Backends.Cache.get_backend(context.backend_id)
    events = for %{data: le} <- messages, do: le
    source = Sources.Cache.get_by_id(context.source_id)

    metadata =
      %{
        "source_id" => context[:source_id],
        "source_uuid" => context[:source_token],
        "backend_id" => context[:backend_id],
        "backend_uuid" => context[:backend_token]
      }

    HttpBased.Client.send_events(context.client, events, backend, source, metadata)

    messages
  end

  # Broadway transformer for custom producer
  def transform(event, _opts) do
    %Message{
      data: event,
      acknowledger: {__MODULE__, :ack_id, :ack_data}
    }
  end

  def ack(_ack_ref, _successful, _failed) do
    # TODO: re-queue failed
  end
end
